Stream factories
ストリームファクトリは、IRPの設計の初期段階で追加されました。Rxには多くの既存のサブジェクトタイプ(Subject<T>、AsyncSubject<T>、BehaviorSubject<T>、ReplaySubject<T>など)があるため、ストリームファクトリがIRPで果たすべき役割があることはすぐにわかりました。その意味では、ストリームファクトリは、サブジェクトコンストラクタの関数定義に過ぎません。
Stream factories were added early on during the design of IRP. It was immediately apparent that stream factories had a role to play in IRP because of the many existing subject types in Rx (e.g. Subject<T>, AsyncSubject<T>, BehaviorSubject<T>, and ReplaySubject<T>). In that sense, stream factories are merely function definitions for subject constructors.
IRPシステムでは、これらの基本的なサブジェクトの実装に加えて、外部サービスに支えられたストリームのためのストリーム・ファクトリを定義することができます。例えば、ストリーム・ファクトリは、信頼性のある永続的なキューへの接続文字列をパラメータとすることができます。接続文字列に具体的な値を指定してストリーム・ファクトリを呼び出すと、IRPシステム内にストリームが作成されます。このストリームは、観測可能なソースとオブザーバー・シンクの両方として使用でき、下層にある信頼性の高いキューによって物理的にバックアップされます。これにより、SQL Server Integration Services(SSIS)のExtract-Transform-Load(ETL)プラットフォームのソースとデスティネーションに似た方法で、既存のイベントストリーミングシステムへの「アダプタ」を作成することができます。
Besides these essential subject implementations, and IRP system can define stream factories for streams that are backed by external services. For example, a stream factory could be parameterized on a connection string to some persistent reliable queue. Upon invoking the stream factory with a concrete value for a connection string, a stream is created in the IRP system that can be used both as an observable source and an observer sink, being physically backed by the reliable queue underneath. This enables the creation of “adapters” to existing event streaming systems in a way similar to sources and destinations in the SQL Server Integration Services (SSIS) Extract-Transform-Load (ETL) platform.
code:C#
// Rx
interface ISubject<T> : ISubject<T, T> {}
interface ISubject<out T, in U> : IObservable<T>, IObserver<U> {}
class Program
{
ISubject<T> CreateReplay(int count) => new ReplaySubject<T>(count);
}
// IRP
interface IAsyncReactiveQubject<T> : IAsyncReactiveQubject<T, T> {}
interface IAsyncReactiveQubject<out T, in U> : IAsyncReactiveQbservable<T>, IAsyncReactiveQbserver<U> {}
class Program
{
Task<ISubject<T>> CreateReplay(Uri streamId, int count) =>
AsyncReactiveQubject.Replay(streamId, count);
}